December 18, 2014
parallel?The parallel package merges multicore and snow
multicore functionality on Unix-like systems and snow functionality on Windowssnow functionality to execute on a cluster for both systems)Make the desired number of cores available to R:
require(doParallel) # Determine number of CPU cores in your machine nCores = detectCores() # Create cluster with desired number of cores cl = makeCluster(nCores) # Register cluster registerDoParallel(cl)
foreach parallelizes for loopsparallel does the same for apply functionsConsider this simple example:
input = 1:100
input.list = as.list(input)
testFun = function(i){
mu = mean(runif(1e+06, 0, i))
return(mu)
}
(There is something wrong here… we'll come back to this.)
How would we do this with foreach?
system.time(
mu.foreach <- foreach(i=1:100,
.combine = "c") %dopar% {
testFun(i)
}
)
## user system elapsed ## 0.104 0.011 3.311
Now let's try it with sapply and parSapply
system.time( sapply(input, testFun) )
## user system elapsed ## 5.535 0.205 5.972
system.time( parSapply(cl, input, testFun) )
## user system elapsed ## 0.004 0.000 2.929
Now let's try it with lapply and parLapply
system.time( lapply(input.list, testFun) )
## user system elapsed ## 5.352 0.231 6.127
system.time( parLapply(cl, input.list, testFun) )
## user system elapsed ## 0.003 0.000 3.343
Now let's try it with mclapply
# not available on Windows system.time( mclapply(input.list, testFun, mc.cores=nCores) )
## user system elapsed ## 5.577 0.708 3.611
We can also parallelize vector map functions using pvec:
require(fields) d = runif(5e+06, 0.1, 10) system.time(Matern(d))
## user system elapsed ## 3.700 0.207 4.396
system.time(pvec(d, Matern, mc.cores=nCores))
## user system elapsed ## 4.091 0.585 2.640
Let's go back and fix our first example:
input = 1:100
testFun = function(i){ mean(runif(1e+06, 0, i)) }
clusterSetRNGStream(cl, iseed=0) res1 = parSapply(cl, input, testFun) clusterSetRNGStream(cl, iseed=0) res2 = parSapply(cl, input, testFun)
identical(res1, res2)
## [1] TRUE
run1 = function(...){
require(boot); require(splines)
load(url("http://www.stat.colostate.edu/~scharfh/CSP_parallel/data/arrivals_subset.RData"))
bikePred = function(data, indices){
d = data[indices,]
big.glm <- glm(arrivals ~
bs(hour, degree = 4)*weekend
+ bs(hour, degree = 4)*as.factor(id),
data = d, family = "poisson")
mynewdat = data.frame(weekend=FALSE, id=293, hour=17)
return(predict(object=big.glm, newdata=mynewdat, type="response"))
}
boot(data=arrivals.sub, statistic=bikePred, R=250)
}
system.time( bike.boot <- do.call(c, lapply(seq_len(nCores), run1)) )
## user system elapsed ## 174.906 13.798 230.409
clusterSetRNGStream(cl, iseed=123) system.time( bike.boot2 <- do.call(c, parLapply(cl, seq_len(nCores), run1)) )
## user system elapsed ## 0.008 0.002 113.577
boot.ci(bike.boot2, type="perc")
## BOOTSTRAP CONFIDENCE INTERVAL CALCULATIONS ## Based on 1000 bootstrap replicates ## ## CALL : ## boot.ci(boot.out = bike.boot2, type = "perc") ## ## Intervals : ## Level Percentile ## 95% (21.57, 25.13 ) ## Calculations and Intervals on Original Scale
Since the boot package has built-in parallel support, we could also simply run the following:
nBoot = nCores*250
set.seed(123, kind="L'Ecuyer")
system.time(
bike.boot3 <- boot(data=arrivals.sub, statistic=bikePred, R=nBoot,
parallel="multicore", ncpus=4)
)
## user system elapsed ## 186.661 9.374 72.778
It's a good idea to stop your cluster when you are done using it.
stopCluster(cl)